Skip to content

Conversation

@Samrat002
Copy link
Contributor

What is the purpose of the change

YarnJobListFetcher Implementation

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (yes / no)
  • Core observer or reconciler logic that is regularly executed: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@Samrat002 Samrat002 marked this pull request as ready for review September 23, 2025 05:17
@1996fanrui 1996fanrui self-assigned this Sep 23, 2025
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Samrat002 , thanks for picking it up! and sorry for the late review.

I have some questions about this PR, please help take a look when you are available, thanks

--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
```
When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web proxy endpoint that exposes the JobManager REST API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the host/port to the YARN web proxy endpoint

Do you mean autoscaler.standalone.fetcher.flink-cluster.host and autoscaler.standalone.fetcher.flink-cluster.port?

If yes, it does not make sense. Because all config options with autoscaler.standalone.fetcher.flink-cluster prefix are related to flink-cluster. It is better to introduce yarn cluster related config options.

To select the job fetcher use:
```
--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about introducing a whole demo for yarn mode?

We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call
`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are implementations of the
`JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why Flink Autoscaler Standalone only supports a single Flink cluster so far.

It no longer makes sense after adding YARN support, and the sentence should either be removed or rewritten to explain that each fetcher instance still monitors a single cluster or YARN deployment.

Comment on lines +92 to +95
default:
return (JobListFetcher<KEY, Context>)
new FlinkClusterJobListFetcher(
clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of AutoscalerStandaloneOptions.FETCHER_TYPE is FLINK_CLUSTER, so including default case that falls back to FLINK_CLUSTER here does not make sense, because it silently accepts invalid configuration values. Throwing an exception for unknown fetcher types is better. It could prevent potential bugs if introducing new type in the future.

Comment on lines +57 to +62
public static final ConfigOption<FetcherType> FETCHER_TYPE =
autoscalerStandaloneConfig("fetcher.type")
.enumType(FetcherType.class)
.defaultValue(FetcherType.FLINK_CLUSTER)
.withDescription(
"The job list fetcher type to use. Supported values: FLINK_CLUSTER, YARN.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/flink-kubernetes-operator/blob/main/docs/README.md

Please generate docs according to this doc. Also, IIRC, it is not needed to mentioned values, and doc tools will list all values by default.

Comment on lines +157 to +158
} catch (Throwable ignore) {
// Ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It suppresses all exceptions including critical ones like OutOfMemoryError without any logging, making it impossible to diagnose why YARN-based job discovery failed, such as: do not know if there are configuration issues, network problems, or authentication failures.

return discovered;
}

// use supplied client factory (may point to direct JM or a reverse proxy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fallback to JM or flink cluster here? If this is what the user expects, why choosing yarn cluster fetcher instead of flink cluster fetcher?

Comment on lines +104 to +108
yarnClient = YarnClient.createYarnClient();
org.apache.hadoop.conf.Configuration yarnConf =
new org.apache.hadoop.conf.Configuration();
yarnClient.init(yarnConf);
yarnClient.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating YarnClient without any Hadoop configuration, I am not sure whether it works. Generally, it needs Hadoop configuration files like core-site.xml or yarn-site.xml that might be present in the classpath.

}
break;
}
} catch (Throwable ignore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch does not provide fault isolation among jobs or yarn applications, if one job is stuck on GC or something else, the autoscaler won't work for all applciations.

Comment on lines +104 to +108
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to minimize the scope of dependencies? For example, only yarn-client is added here.

Also, is it needed to exclude some dependencies to avoid dependency conflicts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants